package com.pw.study.flink.chapter2;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class DealTransform {

    public static void main(String[] args) {
        //dealConnect();

        //dealUnion();
        dealSumAndSimple();
    }

    private static void dealSumAndSimple() {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 20000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        DataStreamSource<Integer> stream = env.fromElements(1, 20, 3, 4, 5);
        KeyedStream<Integer, String> kbStream = stream.keyBy(ele -> ele % 2 == 0 ? "奇数" : "偶数");
        //kbStream.print("init");


        kbStream.sum(0).print("sum");
        //kbStream.max(0).print("max");
        //kbStream.min(0).print("min");


        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void dealUnion() {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 20000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        DataStreamSource<Integer> stream1 = env.fromElements(1, 2, 3, 4, 5);
        DataStreamSource<Integer> stream2 = env.fromElements(10, 20, 30, 40, 50);
        DataStreamSource<Integer> stream3 = env.fromElements(100, 200, 300, 400, 500);

// 把多个流union在一起成为一个流, 这些流中存储的数据类型必须一样: 水乳交融
        stream1
                .union(stream2)
                .union(stream3)
                .print();


        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 1.	两个流中存储的数据类型可以不同
     * 2.	只是机械的合并在一起, 内部仍然是分离的2个流
     * 3.	只能2个流进行connect, 不能有第3个参与
     */
    private static void dealConnect() {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 20000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        DataStreamSource<Integer> intStream = env.fromElements(1, 2, 3, 4, 5);
        DataStreamSource<String> stringStream = env.fromElements("a", "b", "c");
// 把两个流连接在一起: 貌合神离
        ConnectedStreams<Integer, String> cs = intStream.connect(stringStream);
        cs.getFirstInput().print("first");
        cs.getSecondInput().print("second");


        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
